Skip to content

Conversation

desertaxle
Copy link
Contributor

@desertaxle desertaxle commented Oct 2, 2025

Summary

This PR introduces an integration with Prefect to enable durable, fault-tolerant execution of Pydantic AI agents.

Examples

Run a pydantic-ai agent as a Prefect flow

from pydantic_ai import Agent
from pydantic_ai.durable_exec.prefect import PrefectAgent

# Existing agent
agent = Agent('gpt-4o', name='my_agent')

# Enable durable execution
prefect_agent = PrefectAgent(agent)

# Use as normal, now run as a Prefect flow with model call and tool call tasks
result = await prefect_agent.run('What is the capital of Mexico?')

print(result.output) # The capital of Mexico is Mexico City

The PrefectAgent class wraps an Agent and instruments it so that .run calls are executed as a Prefect flow, and all tool and model calls are executed as Prefect tasks.

Customize underlying task behavior

prefect_agent = PrefectAgent(
    agent,
    model_task_config=TaskConfig(
        retries=3,
        retry_delay_seconds=[1.0, 2.0, 4.0],
        timeout_seconds=30.0,
    ),
)

Create an agent service to enable scheduling and remote execution

# Deploy agent with scheduled execution
from prefect import flow
from prefect.deployments import run_deployment
from pydantic_ai import Agent
from pydantic_ai.durable_exec.prefect import PrefectAgent

agent = Agent(
    'openai:gpt-4o',
    name='daily_report_agent',
    instructions='Generate a daily summary report.',
)

prefect_agent = PrefectAgent(agent)

@flow
async def daily_report_flow(user_prompt: str):
    """Generate a daily report using the agent."""
    result = await prefect_agent.run(user_prompt)
    return result.output

# Serve the flow with a daily schedule
if __name__ == '__main__':
    daily_report_flow.serve(
        name='daily-report-deployment',
        cron='0 9 * * *',  # Run daily at 9am
        parameters={'user_prompt': "Generate today's report"},
        tags=['production', 'reports'],
    )

Implementation Details

The integration is implemented through:

  • PrefectAgent: Wrapper class that extends WrapperAgent to intercept and wrap agent operations
  • PrefectModel: Model wrapper that executes requests as Prefect tasks with event streaming support
  • PrefectFunctionToolset: Wraps tool calls as tasks while preserving type safety
  • A custom Prefect cache policy to ensure resumability of agent runs

Copy link
Contributor

@zzstoatzz zzstoatzz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a couple small thoughts

@desertaxle desertaxle marked this pull request as ready for review October 7, 2025 19:24
@DouweM DouweM self-assigned this Oct 8, 2025
pytest.mark.anyio,
pytest.mark.vcr,
pytest.mark.xdist_group(name='prefect'),
pytest.mark.filterwarnings('ignore:Found propagated trace context.*:RuntimeWarning'),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a problem?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefect populates a traceparent header, so Logfire is raising a warning. Looks like I can handle that by providing a value to distributed_tracing, but I might need to make a modification to the capfire fixture to allow that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't realize the capfire fixture is from logfire. I think I'll need to filter that warning in one test.

@desertaxle
Copy link
Contributor Author

Thanks for the review @DouweM! This PR is ready for another review when you get the chance!

* Wraps [tool calls](../tools.md) as Prefect tasks (configurable per-tool).
* Wraps [MCP communication](../mcp/client.md) as Prefect tasks.

Event stream handlers are **not automatically wrapped** by Prefect. If they involve I/O or non-deterministic behavior, you can explicitly decorate them with `@task` from Prefect.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we link to the section in agents.md that introduces event stream handlers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a link in a941da4 to what I think is the right page and section, but please correct me if I'm wrong.

* **Prefect**: Workflow-level orchestration, task status, and retry history
* **Logfire**: Fine-grained tracing of agent runs, model requests, and tool invocations

When using Logfire with Prefect, you can enable distributed tracing to see spans for your Prefect runs included with your agent runs, model requests, and tool invocations.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Prefect need any special setup to sent OTel data to Logfire?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope! As long as Logfire is configured, it'll send OTel data automatically.

from pydantic_ai.durable_exec.prefect._cache_policies import DEFAULT_PYDANTIC_AI_CACHE_POLICY


class TaskConfig(TypedDict, total=False):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no such type in Prefect? :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, there is not. We have an open issue to expose something like this (PrefectHQ/prefect#14851), which provides good motivation to implement that type natively. I'll try to get this into our next release and update this PR, or create a new one if this one has already been merged.

@desertaxle desertaxle requested a review from DouweM October 13, 2025 14:41
@DouweM DouweM merged commit 89381a2 into pydantic:main Oct 13, 2025
38 checks passed
DouweM added a commit that referenced this pull request Oct 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants